package com.edu.realtime.app.dwd.db;


import com.edu.realtime.util.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author zcx
 * @create 2022-10-19 9:49
 * 交易域加购事务事实表
 */
public class DwdTradeCartAdd {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        //1.3 指定表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2.检查点相关设置
        /*//2.1 启动检查点
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //2.2 设置检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        //2.3 job取消后，检查点是否保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 设置两个检查点之间的最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        //2.5 设置重启策略
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(30),Time.seconds(3)));
        //2.6 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://nameservice1:8020/edu/ck");
        //2.7 设置操作hadoop的用户
        System.setProperty("HADOOP_USER_NAME","root");*/

        //TODO 3.从topic_db中读取业务数据 映射为动态表
        tableEnv.executeSql(MyKafkaUtil.getTopicDbDDL("dwd_trade_cart_add_group"));

        //TODO 4.过滤出加购行为
        Table cartAdd = tableEnv.sqlQuery("select\n" +
                "`data`['id'] id,\n" +
                "`data`['user_id'] user_id,\n" +
                "ts\n" +
                "from topic_db\n" +
                "where `table`='cart_info'\n" +
                "and `type`='insert'");
        tableEnv.createTemporaryView("cart_add",cartAdd);

        //TODO 5.写入kafka对应主题
        //5.1 创建动态表和要写入的主题进行映射
        tableEnv.executeSql("CREATE TABLE dwd_trade_cart_add (\n" +
                "  id string,\n" +
                "  user_id string,\n" +
                "  ts string,\n" +
                "  PRIMARY KEY (id) NOT ENFORCED\n" +
                ") " + MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_cart_add"));

        //5.2 写入
        tableEnv.executeSql("insert into dwd_trade_cart_add select * from cart_add");

    }
}
